package com.atguigu.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.realtime.app.BaseApp;
import com.atguigu.realtime.util.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/4/20 11:07
 */
public class DWMUserJumpDetailApp_1 extends BaseApp {
    public static void main(String[] args) {
        new DWMUserJumpDetailApp_1().init(3002, 2, "DWMUserJumpDetailApp_1", "DWMUserJumpDetailApp_1", "dwd_page");
    }
    
    @Override
    protected void run(StreamExecutionEnvironment env,
                       DataStreamSource<String> sourceStream) {
        sourceStream.print();
        /*sourceStream =
            env.fromElements(
                "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000}",
                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":20000}",
                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                    "\"home\"},\"ts\":48999} ",
                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                    "\"detail\"},\"ts\":50000} "
            );*/
        /*sourceStream =
            env.fromElements(
                "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
                "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":11000} "
            );*/
        KeyedStream<JSONObject, String> jsonObjKS = sourceStream
            .map(JSON::parseObject)
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner((obj, ts) -> obj.getLong("ts"))
            )
            .keyBy(obj -> obj.getJSONObject("common").getString("mid"));
        
        // 1. 定义模式
        Pattern<JSONObject, JSONObject> pattern = Pattern
            .<JSONObject>begin("enter")
            .where(new SimpleCondition<JSONObject>() {  // 入口
                @Override
                public boolean filter(JSONObject value) throws Exception {
                    String lastPageId = value.getJSONObject("page").getString("last_page_id");
                    return lastPageId == null || lastPageId.isEmpty();
                }
            })
            .next("next")
            .where(new SimpleCondition<JSONObject>() { // 正常页面
                @Override
                public boolean filter(JSONObject value) throws Exception {
                    
                    String lastPageId = value.getJSONObject("page").getString("last_page_id");
                    return lastPageId == null || lastPageId.isEmpty();
                }
            })
            .within(Time.seconds(20));
        
        // 2. 把模式运用到流上
        PatternStream<JSONObject> ps = CEP.pattern(jsonObjKS, pattern);
        // 3. 从模式流取出匹配或者超时的数据
        SingleOutputStreamOperator<JSONObject> result = ps.select(
            new OutputTag<JSONObject>("jump") {},
            new PatternTimeoutFunction<JSONObject, JSONObject>() {
                @Override
                public JSONObject timeout(Map<String, List<JSONObject>> map,
                                          long timeoutTimestamp) throws Exception {
                    List<JSONObject> list = map.get("enter");
                    return list.get(0);
                }
            },
            new PatternSelectFunction<JSONObject, JSONObject>() {
                @Override
                public JSONObject select(Map<String, List<JSONObject>> map) throws Exception {
    
                    List<JSONObject> list = map.get("enter");
                    return list.get(0);
                }
            }
        );
    
        result
            .union(result.getSideOutput(new OutputTag<JSONObject>("jump") {}))
            .map(obj -> JSON.toJSONString(obj))
            .addSink(MyKafkaUtil.getKafkaSink("dwm_user_jump_detail"));
            
        
    }
}
